/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/******************************************************************************
 * $Id$
 *
 *****************************************************************************/

/** @file umsocketselector.cpp
 * Used to iterate through available socket/port connections for a given UM,
 * when sending response messages back to the UM.
 */

#include "umsocketselector.h"
#ifdef _MSC_VER
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#else
#include <arpa/inet.h>
#include <sys/socket.h>
#endif
#include <iostream>
#include <climits>
#include <sstream>
#include <cstring>
#include <limits>

#include "liboamcpp.h"
using namespace oam;

//#define NDEBUG
#include <cassert>

//#define LOAD_MODULE_DEBUG 1
//#define MOD_CONN_DEBUG    1
//#define SEL_CONN_DEBUG    1

namespace primitiveprocessor
{
/*static*/ const int32_t UmIPSocketConns::NEXT_IOSOCKET_UNASSIGNED = -1;
/*static*/ const int32_t UmModuleIPs::NEXT_IP_SOCKET_UNASSIGNED    = -1;
/*static*/ UmSocketSelector* UmSocketSelector::fpUmSocketSelector  =  0;

//------------------------------------------------------------------------------
// UmSocketSelector methods
//------------------------------------------------------------------------------
// UmSocketSelector Singleton accessor
//------------------------------------------------------------------------------
/* static */ UmSocketSelector*
UmSocketSelector::instance()
{
    if (fpUmSocketSelector == 0)
    {
        fpUmSocketSelector = new UmSocketSelector();
    }

    return fpUmSocketSelector;
}

//------------------------------------------------------------------------------
// UmSocketSelector constructor
//------------------------------------------------------------------------------
UmSocketSelector::UmSocketSelector()
{
    loadUMModuleInfo();
}

//------------------------------------------------------------------------------
// Returns the number of IP addresses defined in the Columnstore.xml file
// return - uint32_t; the total number of IP addresses in the Columnstore.xml file
//------------------------------------------------------------------------------
uint32_t
UmSocketSelector::ipAddressCount() const
{
    uint32_t ipCount = 0;

    for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i)
    {
        ipCount += fUmModuleIPs[i]->ipAddressCount();
    }

    return ipCount;
}

//------------------------------------------------------------------------------
// Loads the UM module information from the Columnstore.xml file, so that we have
// the list of IP addresses that are valid for each UM.  Note that this method
// does not insure thread safeness, but that's okay because it is assumed that
// it will only be called once from the first call to instance().
//------------------------------------------------------------------------------
void
UmSocketSelector::loadUMModuleInfo()
{
    Oam					oam;
    ModuleTypeConfig	moduleTypeConfig;
    const std::string	UM_MODTYPE("um");

    oam.getSystemConfig(UM_MODTYPE, moduleTypeConfig);

    int			moduleCount = moduleTypeConfig.ModuleCount;
    std::string	moduleType = moduleTypeConfig.ModuleType;

#ifdef LOAD_MODULE_DEBUG
    std::cout << "ModuleConfig for type: " << UM_MODTYPE         << std::endl;
    std::cout << "ModuleDesc  = " << moduleTypeConfig.ModuleDesc << std::endl;
    std::cout << "ModuleCount = " << moduleCount                 << std::endl;
    std::cout << "RunType     = " << moduleTypeConfig.RunType    << std::endl;
#endif

    if ( moduleCount > 0 )
    {
        //..Loop through the list of UM modules
        for (DeviceNetworkList::iterator iter1 =
                    moduleTypeConfig.ModuleNetworkList.begin();
                (iter1 != moduleTypeConfig.ModuleNetworkList.end());
                ++iter1)
        {
            std::string moduleName = iter1->DeviceName;

#ifdef LOAD_MODULE_DEBUG
            std::cout << "ModuleName-" << moduleName << std::endl;
#endif

            //..Assign the UM index based on whether it is a new UM or one
            //  we have seen before
            unsigned int umIdx = findOrAddUm( moduleName );

            //..Get the list of IP addresses (NIC's) for this UM module
            for (HostConfigList::iterator iter2 = iter1->hostConfigList.begin();
                    (iter2 != iter1->hostConfigList.end());
                    ++iter2)
            {
                std::string ipAddr = iter2->IPAddr;

#ifdef LOAD_MODULE_DEBUG
                std::cout << "  NIC-" << iter2->NicID <<
                          "; host-" << iter2->HostName <<
                          "; IP-"   << ipAddr          << std::endl;
#endif

                struct in_addr ip;

                if ( inet_aton(ipAddr.c_str(), &ip ) )
                {
                    fIpAddressUmMap[ ip.s_addr ] = umIdx;
                    fUmModuleIPs[umIdx]->addIP( ip.s_addr );
                }
                else
                {
                    std::cerr << "Invalid IP address in SystemModuleConfig "
                              "section: " << ipAddr << std::endl;
                }
            } // loop through the IP addresses for a UM module
        } // loop through the list of UM modules
    } // moduleCount > 0
}

//------------------------------------------------------------------------------
// Search for the specified moduleName, and return the applicable index from
// fUmModuleIPs, if it is found.  Else add the new moduleName.
// return - unsigned int for the index into fUmModuleIPs for moduleName.
//------------------------------------------------------------------------------
unsigned int
UmSocketSelector::findOrAddUm( const std::string& moduleName )
{
    unsigned int umIdx = std::numeric_limits<unsigned int>::max();

    for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i)
    {
        if (fUmModuleIPs[i]->moduleName() == moduleName)
        {
            umIdx = i;
            return umIdx;
        }
    }

    //..We have encountered a new UM module we should add to the list
    fUmModuleIPs.push_back( SP_UM_MODIPS(new UmModuleIPs(moduleName)) );
    umIdx = fUmModuleIPs.size() - 1;

    return umIdx;
}

//------------------------------------------------------------------------------
// Add a new socket/port connection.  It will be grouped with other
// socket/port connections belonging to the same UM.
// ios (in)       - socket/port connection to be added
// writeLock (in) - mutex to use when writing to ios.
// return         - boolean indicating whether socket/port connection was added.
//------------------------------------------------------------------------------
bool
UmSocketSelector::addConnection(
    const SP_UM_IOSOCK& ios,
    const SP_UM_MUTEX&  writeLock )
{
    bool bConnAdded = false;

    sockaddr sa = ios->sa();
    const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
    IpAddressUmMap_t::iterator mapIter =
        fIpAddressUmMap.find ( sinp->sin_addr.s_addr );

    // Add this socket/port connection to the UM connection list it belongs to.
    if ( mapIter != fIpAddressUmMap.end() )
    {
        unsigned int umIdx = mapIter->second;
        bConnAdded = fUmModuleIPs[umIdx]->addSocketConn( ios, writeLock );
    }

    if (!bConnAdded)
    {
#ifdef SEL_CONN_DEBUG
        std::ostringstream oss;
        oss << "No UM/IP match found to add connection " << ios->toString() <<
            std::endl;
        std::cout << oss.str();
#endif
    }

    return bConnAdded;
}

//------------------------------------------------------------------------------
// Delete a socket/port connection from the UM for which it belongs.
// ioSock (in) - socket/port connection to be deleted
//------------------------------------------------------------------------------
void
UmSocketSelector::delConnection( const IOSocket& ios )
{
    sockaddr sa = ios.sa();
    const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
    IpAddressUmMap_t::iterator mapIter =
        fIpAddressUmMap.find ( sinp->sin_addr.s_addr );

    if ( mapIter != fIpAddressUmMap.end() )
    {
        unsigned int umIdx = mapIter->second;
        fUmModuleIPs[umIdx]->delSocketConn( ios );
    }
}

//------------------------------------------------------------------------------
// Get the next socket/port connection belonging to the same UM as ios.
// The selected socket/port connection will be returned in outIos.  It can
// then be used for sending the next response message back to the applicable
// UM module.
// ios (in)        - socket/port connection where a UM request originated from
// outIos (out)    - socket/port connection to use in sending the
//                   corresponding response
// writelock (out) - mutex lock to be used when writing to outIos
// return          - bool indicating if socket/port connection was assigned to
//                   outIos
//------------------------------------------------------------------------------
bool
UmSocketSelector::nextIOSocket(
    const IOSocket& ios,
    SP_UM_IOSOCK&   outIos,
    SP_UM_MUTEX&    writeLock )
{
    sockaddr sa = ios.sa();
    const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
    IpAddressUmMap_t::iterator mapIter =
        fIpAddressUmMap.find ( sinp->sin_addr.s_addr );

    if ( mapIter != fIpAddressUmMap.end() )
    {
        unsigned int umIdx = mapIter->second;

        if (fUmModuleIPs[umIdx]->nextIOSocket( outIos, writeLock ))
        {

#ifdef SEL_CONN_DEBUG
            std::ostringstream oss;
            oss << "UM " << fUmModuleIPs[umIdx]->moduleName() <<
                "; in: " << ios.toString() <<
                "; selected out: " << outIos->toString() << std::endl;
            std::cout << oss.str();
#endif

            return true;
        }
    }

    //..This should not happen.  Application is asking for next socket/port for
    //  a connection not in our UM module list.
    return false;
}

//------------------------------------------------------------------------------
// Convert contents to string for logging, debugging, etc.
//------------------------------------------------------------------------------
const std::string
UmSocketSelector::toString() const
{
    std::ostringstream oss;

    oss << "IP Address to UM index map:" << std::endl;

    for (IpAddressUmMap_t::const_iterator mapIter = fIpAddressUmMap.begin();
            (mapIter != fIpAddressUmMap.end());
            ++mapIter)
    {
        char ipString[INET_ADDRSTRLEN];
        oss << "  IPAddress: " <<
            UmIPSocketConns::nwToString(mapIter->first, ipString) <<
            " maps to UM: "  << mapIter->second << std::endl;
    }

    for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i)
    {
        oss << std::endl << fUmModuleIPs[i]->toString();
    }

    return oss.str();
}

//------------------------------------------------------------------------------
// UmModuleIPs methods
//------------------------------------------------------------------------------
// Add an IP address to be associated with this UM module.
// ip (in) - IP address to associate with this UM module (in network byte order)
//------------------------------------------------------------------------------
void
UmModuleIPs::addIP( in_addr_t ip )
{
    boost::mutex::scoped_lock lock( fUmModuleMutex );

#ifdef MOD_CONN_DEBUG
    std::ostringstream oss;
    char ipString[INET_ADDRSTRLEN];
    oss << "    UM " << fUmModuleName << "; adding IP: " <<
        UmIPSocketConns::nwToString(ip, ipString) << std::endl;
    std::cout << oss.str();
#endif

    fUmIPSocketConns.push_back( SP_UM_IPCONNS(new UmIPSocketConns(ip)) );
}

//------------------------------------------------------------------------------
// Add a new socket/port connection to this UM.  It will be grouped with other
// socket/port connections having the same IP address for this UM.
// ioSock (in)    - socket/port connection to be added
// writeLock (in) - mutex to use when writing to ioSock.
// return         - boolean indicating whether socket/port connection was added.
//------------------------------------------------------------------------------
bool
UmModuleIPs::addSocketConn(
    const SP_UM_IOSOCK& ioSock,
    const SP_UM_MUTEX&  writeLock )
{
    bool bConnAdded = false;

    boost::mutex::scoped_lock lock( fUmModuleMutex );

    for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i)
    {
        sockaddr sa = ioSock->sa();
        const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);

        if (fUmIPSocketConns[i]->ipAddress() == sinp->sin_addr.s_addr)
        {

#ifdef MOD_CONN_DEBUG
            std::ostringstream oss;
            oss << "UM " << fUmModuleName << "; adding connection " <<
                ioSock->toString() << std::endl;
            std::cout << oss.str();
#endif

            fUmIPSocketConns[i]->addSocketConn ( ioSock, writeLock );
            bConnAdded = true;

            //..Initialize fNextUmIPSocketIdx if this is the first socket/port
            //  connection for this UM.
            if ( fNextUmIPSocketIdx == NEXT_IP_SOCKET_UNASSIGNED)
                fNextUmIPSocketIdx = i;

            break;
        }
    }

    return bConnAdded;
}

//------------------------------------------------------------------------------
// Delete a socket/port connection from this UM.
// ioSock (in) - socket/port connection to be deleted
//------------------------------------------------------------------------------
void
UmModuleIPs::delSocketConn( const IOSocket& ioSock )
{
    boost::mutex::scoped_lock lock( fUmModuleMutex );

    for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i)
    {
        sockaddr sa = ioSock.sa();
        const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);

        if (fUmIPSocketConns[i]->ipAddress() == sinp->sin_addr.s_addr)
        {

#ifdef MOD_CONN_DEBUG
            std::ostringstream oss;
            oss << "UM " << fUmModuleName << "; deleting connection " <<
                ioSock.toString() << std::endl;
            std::cout << oss.str();
#endif

            fUmIPSocketConns[i]->delSocketConn ( ioSock );

            //..If we just deleted the last connection for this IP, then ad-
            //  vance fNextUmIPSocketIdx to an IP address having connections.
            if (fUmIPSocketConns[i]->count() == 0)
            {
                advanceToNextIP();
            }

            break;
        }
    }
}

//------------------------------------------------------------------------------
// Get the next socket/port connection for this UM to use in sending a message
// to the applicable UM module.
// outIos (out) - socket/port connection to use in sending next msg
// writelock (out) - mutex lock to be used when writing to outIos
// return          - bool indicating if socket/port connection was assigned to
//                   outIos
//------------------------------------------------------------------------------
bool
UmModuleIPs::nextIOSocket( SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock )
{
    bool found = false;

    boost::mutex::scoped_lock lock( fUmModuleMutex );

    if ((fUmIPSocketConns.size() > 0) &&
            (fNextUmIPSocketIdx != NEXT_IP_SOCKET_UNASSIGNED))
    {
        assert (fNextUmIPSocketIdx < static_cast<int>(fUmIPSocketConns.size()));
        fUmIPSocketConns[fNextUmIPSocketIdx]->nextIOSocket( outIos, writeLock );
        advanceToNextIP();
        found = true;
    }

    return found;
}

//------------------------------------------------------------------------------
// Advance to the "next" available IP (for this UM), skipping over
// any IP's that have 0 socket/port connections.  No mutex locking done, as
// the calling methods will have already locked a mutex.
//------------------------------------------------------------------------------
void
UmModuleIPs::advanceToNextIP()
{
    if ( fUmIPSocketConns.size() > 1 )
    {
        //..Search to end of IP list for an IP having 1 or more connections
        for (int i = fNextUmIPSocketIdx + 1;
                i < static_cast<int>(fUmIPSocketConns.size());
                ++i)
        {
            if (fUmIPSocketConns[i]->count() > 0)
            {
                fNextUmIPSocketIdx = i;
                return;
            }
        }

        //..Wrap back around to the start of the list, to continue the search
        for (int i = 0; i < fNextUmIPSocketIdx; ++i)
        {
            if (fUmIPSocketConns[i]->count() > 0)
            {
                fNextUmIPSocketIdx = i;
                return;
            }
        }

        fNextUmIPSocketIdx = NEXT_IP_SOCKET_UNASSIGNED;
    }
    else // special logic to handle 0 fUmIPSocketConns, or a single empty one
    {
        if ((fUmIPSocketConns.size() == 0) ||
                (fUmIPSocketConns[0]->count() == 0))
        {
            fNextUmIPSocketIdx = NEXT_IP_SOCKET_UNASSIGNED;
        }
    }
}

//------------------------------------------------------------------------------
// Convert contents to string for logging, debugging, etc.
//------------------------------------------------------------------------------
const std::string
UmModuleIPs::toString()
{
    std::ostringstream oss;

    boost::mutex::scoped_lock lock( fUmModuleMutex );
    oss << "UM module name: " << fUmModuleName <<
        "; nextUmIPIdx: " << fNextUmIPSocketIdx << std::endl;

    for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i)
    {
        oss << fUmIPSocketConns[i]->toString();
    }

    return oss.str();
}

//------------------------------------------------------------------------------
// UmIPSocketConns methods
//------------------------------------------------------------------------------
// Add the specified socket/port to the connection list for the IP address
// represented by this UmIPSocketConns object.
// ioSock (in)    - socket/port connection to be added
// writeLock (in) - mutex to use when writing to ioSock
//------------------------------------------------------------------------------
void
UmIPSocketConns::addSocketConn(
    const SP_UM_IOSOCK& ioSock,
    const SP_UM_MUTEX&  writeLock )
{
    UmIOSocketData sockData =
    {
        SP_UM_IOSOCK(ioSock), SP_UM_MUTEX(writeLock)
    };
    fIOSockets.push_back( sockData );

    //..Initialize fNextIOSocketIdx when we add first connection
    if (fIOSockets.size() == 1)
        fNextIOSocketIdx = 0;
}

//------------------------------------------------------------------------------
// Delete the specified socket/port from the connection list for the IP address
// referenced by this UmIPSocketConns object.
// ioSock (in) - socket/port connection to be deleted
//
// Not normally a good thing to be using a std::vector if we are going to be
// deleting elements in the middle of the collection.  Very inefficient.
// But this method won't be called often, and we are only dealing with a small
// collection.  Plus we want to use a vector over a list, so that nextIOSocket()
// can benefit from quick random access.
//------------------------------------------------------------------------------
void
UmIPSocketConns::delSocketConn( const IOSocket& ioSock )
{
    for (unsigned int i = 0; i < fIOSockets.size(); ++i)
    {
        sockaddr sa1 = fIOSockets[i].fSock->sa();
        const sockaddr_in* sinp1 = reinterpret_cast<const sockaddr_in*>(&sa1);
        sockaddr sa2 = ioSock.sa();
        const sockaddr_in* sinp2 = reinterpret_cast<const sockaddr_in*>(&sa2);

        if (sinp1->sin_port == sinp2->sin_port)
        {
            fIOSockets.erase ( fIOSockets.begin()  + i );

            //..Adjust fNextIOSocketIdx
            //  1a. decrement if fNextIOSocketIdx is after deleted connection
            //  1b. reset to start of vector if we deleted the last connection
            //  2.  reset fNextIOSocketIdx to -1 if we have no more connections
            if (fIOSockets.size() > 0)
            {
                if (fNextIOSocketIdx > static_cast<int>(i))
                    fNextIOSocketIdx--;

                if ( fNextIOSocketIdx >= static_cast<int>(fIOSockets.size()) )
                    fNextIOSocketIdx = 0;
            }
            else
            {
                fNextIOSocketIdx = NEXT_IOSOCKET_UNASSIGNED;
            }

            break;
        }
    }
}

//------------------------------------------------------------------------------
// Get the next socket/port connection for this IP to use in sending a message
// to the applicable UM module.
// outIos (out)    - socket/port connection to use in sending next msg
// writelock (out) - mutex lock to be used when writing to outIos
//------------------------------------------------------------------------------
void
UmIPSocketConns::nextIOSocket ( SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock )
{
    assert (fIOSockets.size() > 0);
    assert (fNextIOSocketIdx != NEXT_IOSOCKET_UNASSIGNED);
    assert (fNextIOSocketIdx  < static_cast<int>(fIOSockets.size()));

    outIos    = fIOSockets[fNextIOSocketIdx].fSock;
    writeLock = fIOSockets[fNextIOSocketIdx].fMutex;

    //..Update "next" index, being sure to wrap around to the start
    //  whenever we reach the end of the vector.
    fNextIOSocketIdx++;

    if (fNextIOSocketIdx >= static_cast<int>(fIOSockets.size()))
        fNextIOSocketIdx = 0;
}

//------------------------------------------------------------------------------
// Convert network byte ordered IP address to a string.
// return - char* is returned with the IP address string.
//------------------------------------------------------------------------------
/* static */ char*
UmIPSocketConns::nwToString( in_addr_t addr, char* ipString )
{
    in_addr addrStruct = { addr };
#ifndef _MSC_VER

    if (!inet_ntop(AF_INET, &addrStruct, ipString, INET_ADDRSTRLEN))
#endif
        strcpy(ipString, "unknown");

    return ipString;
}

//------------------------------------------------------------------------------
// Convert contents to string for logging, debugging, etc.
//------------------------------------------------------------------------------
const std::string
UmIPSocketConns::toString() const
{
    std::ostringstream oss;

    char ipString[INET_ADDRSTRLEN];
    oss << "  IPAddress: " << UmIPSocketConns::nwToString(fIpAddress, ipString) <<
        "; nextIOSocketIdx: " << fNextIOSocketIdx << std::endl;

    for (unsigned int i = 0; i < fIOSockets.size(); ++i)
    {
        sockaddr sa = fIOSockets[i].fSock->sa();
        const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
        oss << "    port: " << ntohs(sinp->sin_port) <<
            std::endl;
    }

    return oss.str();
}

} // end of primitiveprocessor namespace
